-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink #6968
Conversation
@yxu-valleytider Please make sure that the Pull Request title references the corresponding JIRA. I have modified the title accordingly this time. |
@yxu-valleytider please include tag |
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.streaming.connectors.dynamodbstreams; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the newly added packages should be under org.apache.flink.streaming.connectors.kinesis
as they are part of the flink-connector-kinesis
module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, both the dynamodbstreams
and kinesis
modules are under the parent module org.apache.flink.streaming.connectors
. This is also consistent with the directory structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just packages with the module flink-connector-kinesis
and the package prefix should reflect that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case I don't mind putting the files into a separate directory, e.g., flink-connector-dynamodbstreams
. WDYT ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that will work: We had already determined in our internal work that we need a single shaded AWS dependency. I don't think it's desirable either, because it is just a slight variation of the Kinesis consumer that does not deserve a separate module.
DynamodbStreamsShardHandle.compareShardIds(shardIdInvalid, shardIdValid); | ||
fail("invalid shard Id" + shardIdInvalid + " should trigger exception"); | ||
} catch (IllegalArgumentException e) { | ||
// ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be // expected
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes // expected
clarifies more.
@@ -73,7 +73,7 @@ | |||
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); | |||
|
|||
/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ | |||
private final AmazonKinesis kinesisClient; | |||
protected final AmazonKinesis kinesisClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this modifier change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used inside DynamodbStreamsProxy
to execute the describeStream
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If describeStream
moves to KinesisProxy
, this change becomes unnecessary.
PTAL @tzulitai |
super(streams, deserializer, config); | ||
} | ||
|
||
public static <T> FlinkDynamodbStreamsConsumer<T> create(String stream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove these static create methods, they don't provide any benefit since the constructors are public (which is consistent with the base class).
* | ||
* @return the result of the describe stream operation | ||
*/ | ||
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there is anything specific to DynamoDB in this implementation, move it to KinesisProxy. (DescribeStream is standard Kinesis, and it was used for discovery before listShards got introduced.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK to move. Just in that case DescribeStream
will be marked as protected
at least, in order to be visible to DynamoDBStreamsProxy.
By keeping it inside the DynamodbStreamsProxy layer this logic can be private
specific for interacting with DynamodbStreams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making it protected is fine. Since these are common capabilities of the Kinesis API it is better to keep it in one place.
@@ -73,7 +73,7 @@ | |||
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class); | |||
|
|||
/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */ | |||
private final AmazonKinesis kinesisClient; | |||
protected final AmazonKinesis kinesisClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If describeStream
moves to KinesisProxy
, this change becomes unnecessary.
* Different tag name to distinguish from "flink.stream.describe.backoff.base" | ||
* since the latter is deprecated. | ||
*/ | ||
public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these and remove the @Deprecated
annotations from the prior properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tweise are you ok with removing this function KinesisProxy::replaceDeprecatedConsumerKeys
as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to modify the method to map/replicate the keys but not remove them (since you need them for dynamo). Add a comment that this is for backward compatibility for the regular proxy that is now using listShards instead of describeShards. We can remove the mapping in later release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
* Updates the last discovered shard of a subscribed stream; only updates if the update is valid. | ||
*/ | ||
@Override | ||
public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than duplicating complete logic from the base class, can we just extract what is unique to DynamoDB? That might also eliminate the need to expose subscribedStreamsToLastDiscoveredShardIds
?
PTAL @tweise |
super(configProps); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra line.
@@ -55,6 +58,12 @@ | |||
import java.util.Properties; | |||
import java.util.Random; | |||
|
|||
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stay consistent with the existing implementation and not add these imports.
* Boolean to indicate whether to compare/enforce shardId format based on the one defined in | ||
* DynamodbStreamsShardHandle. | ||
*/ | ||
public static final String DYNAMODB_STREAMS_SHARDID_FORMAT_CHECK = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This property isn't actually used? If so, please remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The property is currently used inside DynamoDBStreamsDataFetcher
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key is never used to set the property, that's what not used refers to. The documentation also does not mention why anyone would use it. If there is no use case for this, let's remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the default to true
and removed the config property.
Does the PR need to be rebased? |
/** | ||
* DynamoDB streams proxy: interface interacting with the DynamoDB streams. | ||
*/ | ||
public class DynamodbStreamsProxy extends KinesisProxy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DynamodbStreamsProxy => DynamoDBStreamsProxy (same may apply elsewhere also)
@@ -469,6 +496,61 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha | |||
return listShardsResults; | |||
} | |||
|
|||
/** | |||
* Get metainfo for a Dynamodb stream, which contains information about which shards this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove DynamoDB reference here. This is applicable to proper Kinesis also.
configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey)); | ||
configProps.remove(deprecatedOldKey); | ||
if (configProps.containsKey(oldKey)) { | ||
LOG.warn("Backfill the property key {} based on the original key {}", newKey, oldKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this warning since it isn't applicable any more (the key is expected to be used for DynamoDB).
* | ||
* In the context of 1), the set of configurations needs to be translated to the corresponding | ||
* configurations in the Kinesis listShards API. In the mean time, keep these configs since | ||
* they may be useful in the context of 2), i.e., polling data from a DynamoDB stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"may be useful" => "are applicable"
Dependency diff shows the following:
The version of |
acbfa2e
to
d43ba9e
Compare
@yxu-valleytider thanks for the contribution! |
…pache#6968) Introduces a new Flink source to consume from DynamoDB streams. This new source is built on top of the existing Kinesis connector. It interacts with the DynamoDB streams via a dynamodb-streams-kinesis-adapter client.
What is the purpose of the change
This PR introduces a new Flink source to consume directly from dynamodb streams. This new source is built on top of the existing Kinesis connector. It interacts with the dynamodb streams via a dynamodb-streams-kinesis-adapter client.
Brief change log
New data stream can be constructed to directly pull data from DynamoDB streams.
Other changes include:
dynamodbstreams-kinesis
adapter client to interact with Dynamodb streams.describeStream
API from the flink-1.5 branch into DynamodbStreamsProxy.Verifying this change
This change is already covered by most of the existing Flink kinesis connector tests.
Manual tests are provided to verify that the dynamodbstreams connector can:
Does this pull request potentially affect one of the following parts:
Added optional dependency on AWS dynamodb-streams-kinesis-adapter.
@Public(Evolving)
: (no)Documentation